package org.luosl.webmagicx.scheduler

import java.util.concurrent.locks.ReentrantLock

import com.alibaba.fastjson.JSON
import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import org.apache.http.annotation.GuardedBy
import org.luosl.webmagicx.conf.PropType.{intType, strType}
import org.luosl.webmagicx.conf.{SpiderConf, XmlProps}
import redis.clients.jedis.{JedisPool, Protocol}
import us.codecraft.webmagic.{Request, Task}
import org.luosl.webmagicx.conf.MatcherConverters._
import us.codecraft.webmagic.scheduler.component.DuplicateRemover
import us.codecraft.webmagic.utils.JsonUtil

import org.luosl.webmagicx.utils.ResourceUtils._

/**
  * 基于redis的优先级调度器
  * Created by luosl on 2018/3/15.
  */
class PriorityRedisScheduler(sc:SpiderConf, task:Task, props:XmlProps) extends AbstractScheduler(sc, task, props) with DuplicateRemover{

  private val prioritySet:Set[Long] = allPriority.toSet

  private val pollLock:ReentrantLock = new ReentrantLock

  val host:String = props.valueOrDefault("host", "value")(strType)(Protocol.DEFAULT_HOST)
  val port:Int = props.valueOrDefault("port", "value")(intType)(Protocol.DEFAULT_PORT)
  val timeout:Int = props.valueOrDefault("timeout", "value")(intType)(Protocol.DEFAULT_TIMEOUT)
  val password:String = props.valueOrNull("password", "value")(strType)
  val database:Int = props.valueOrDefault("database", "value")(intType)(Protocol.DEFAULT_DATABASE)
  val clientName:String = props.valueOrNull("clientName", "value")(strType)

  /**
    * redis pool
    */
  protected val pool: JedisPool = {
    new JedisPool(
      new GenericObjectPoolConfig(),
      host,
      port,
      timeout,
      password,
      database,
      clientName
    )
  }

  // 设置 DuplicateRemover
  this.setDuplicateRemover(this)

  override def resetDuplicateCheck(task: Task): Unit = withClose(pool.getResource)(_.del(getSetKey(task)))


  override def isDuplicate(request: Request, task: Task): Boolean = withClose(pool.getResource)(_.sadd(getSetKey(task), request.getUrl) == 0)

  override protected def pushWhenNoDuplicate(request: Request, task: Task): Unit = {
    withClose(pool.getResource) { jedis =>
      val priorityOpt: Option[Long] = prioritySet.find(_ == request.getPriority)
      priorityOpt match {
        case Some(priority) =>
          jedis.rpush(getQueueKey(task, priority), JsonUtil.toJSONString(request))
        case _ =>
          jedis.rpush(getQueueKey(task, 0), JsonUtil.toJSONString(request))
      }
    }
  }

  protected def getSetKey(task: Task): String = PriorityRedisScheduler.set_prefix + task.getUUID

  protected def getQueueKey(task: Task, priority:Long): String = s"${PriorityRedisScheduler.queue_prefix}priority_${priority}_${task.getUUID}"

  override def getLeftRequestsCount(task: Task): Int =withClose(pool.getResource){ jedis =>
     allPriority
       .map(pri=> getQueueKey(task, pri))
       .map(key=> jedis.llen(key).longValue())
       .sum.toInt
   }

  override def getTotalRequestsCount(task: Task): Int = withClose(pool.getResource){ jedis=>
    val size = jedis.scard(getSetKey(task))
    size.intValue
  }

  /**
    * get an url to crawl
    *
    * @param task the task of spider
    * @return the url to crawl
    */
  @GuardedBy("pollLock")
  override def poll(task: Task): Request = {
    withLock(pollLock){ _ =>
      withClose(pool.getResource){ jedis =>
        val queueKeyOpt:Option[String] = allPriority
          .map(pri=> getQueueKey(task, pri))
          .find(queueKey=> jedis.llen(queueKey) > 0)
        val requestJsonOpt:Option[String] = queueKeyOpt match {
          case Some(queueKey) => Option(jedis.lpop(queueKey))
          case None => None
        }
        requestJsonOpt match {
          case Some(jsonStr) => JSON.parseObject(jsonStr,classOf[Request])
          case None => null
        }
      }
    }
  }
}

object PriorityRedisScheduler {

  /**
    * set 前缀
    */
  val set_prefix = "webmagicx_set_"
  /**
    * 队列前缀
    */
  val queue_prefix = "webmagicx_queue_"

}
